package com.hanxiaozhang.example.listener.mgsconsumer;

import com.hanxiaozhang.constant.RocketConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
 * 〈一句话功能简述〉<br>
 * 〈Assign模式拉取消息〉
 * <p>
 * 它与Subscribe模式不同的是，Assign模式下没有自动的负载均衡机制，需要用户自行指定需要拉取的队列
 *
 * @author hanxinghua
 * @create 2022/10/3
 * @since 1.0.0
 */
@Slf4j
@Component
public class No8LitePullAssignMsgOriginalSyntax {


    /**
     *
     */
    public void pull() {

        Long startTime = System.currentTimeMillis();

        DefaultLitePullConsumer litePullConsumer = null;
        try {
            // 1. 初始化DefaultLitePullConsumer，并启动
            litePullConsumer = new DefaultLitePullConsumer(RocketConstant.LITE_PULL_ASSIGN_CONSUMER_GROUP);
            // 手动提交位点的方式
            litePullConsumer.setAutoCommit(false);
            litePullConsumer.start();
            // 2. 获取Topic下的所有队列，取前面的一半队列进行拉取
            Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues(RocketConstant.LITE_PULL_ASSIGN_TOPIC);
            List<MessageQueue> list = new ArrayList<>(mqSet);
            List<MessageQueue> assignList = new ArrayList<>();
            for (int i = 0; i < list.size() / 2; i++) {
                assignList.add(list.get(i));
            }
            litePullConsumer.assign(assignList);
            // 3. 将第一个队列拉取的位点设置10，从10开始拉去
            litePullConsumer.seek(assignList.get(0), 10);
            while ((System.currentTimeMillis() - startTime) < 10 * 1000) {
                log.info("循环");
                // 4. 循环不停地调用poll方法拉取消息，拉取到消息后调用commitSync方法手动提交位点。
                List<MessageExt> messageExts = litePullConsumer.poll();
                messageExts.forEach(x->{
                    try {
                        log.info("{}收到消息：{}", this.getClass().getSimpleName(),  new String(x.getBody(), "utf-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                });
                litePullConsumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (litePullConsumer != null) {
                litePullConsumer.shutdown();
            }
        }
    }

}
